home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Personal Computer World 2009 February
/
PCWFEB09.iso
/
Software
/
Resources
/
Chat & Communication
/
Digsby build 37
/
digsby_setup.exe
/
lib
/
oscar
/
OscarBuddies.pyo
(
.txt
)
< prev
next >
Wrap
Python Compiled Bytecode
|
2008-10-13
|
27KB
|
823 lines
# Source Generated with Decompyle++
# File: in.pyo (Python 2.5)
from __future__ import with_statement
aim5_caps = frozenset(('buddy_list_transfer', 'chat_service', 'voice_chat', 'file_xfer', 'direct_im', 'avatar', 'add_ins'))
digsby = frozenset(('icq_to_aim', 'buddy_list_transfer', 'ichatav_info', 'xhtml_support', 'avatar', 'digsby'))
pidgin_caps = frozenset(('file_xfer', 'direct_im', 'avatar', 'chat_service'))
aim6_caps = frozenset(('direct_im', 'chat_service', 'file_xfer', 'voice_chat', 'avatar'))
icq6_caps = frozenset(('icq6_unknown5', 'icq_unknown1', "\xe3b\xc1\xe9\x12\x1aK\x94\xa6&zt\xde$'\r", 'aim_file_xfer', 'icq6_unknown1', 'rtf_support', 'icq6_unknown3', 'route_finder', '\xb9\x97\x08\xb5:\x92B\x02\xb0i\xf1\xe7W\xbb.\x17', 'icq6_unknown4'))
clients_supporting_html = frozenset(('aim60', 'aim59', 'purple', 'icq6', 'mobile', 'digsby', 'miranda-aim'))
import struct
import time
import datetime
from util.observe import ObservableDict, ObservableProperty
oproperty = ObservableProperty
import common
from common import profile
from common.actions import action
from logging import getLogger
log = getLogger('oscar.buddies')
import util
import oscar
from util import get
from util import Storage
from util import cproperty
from util import FilterDict
from util import preserve_whitespace
from util.callbacks import callsback
from traceback import print_exc
import re
body_pattern = re.compile('<(body|BODY).*?((bgcolor|BGCOLOR)=(.*?))? ?>')
blast_group_re = re.compile('\\[.*\\]')
import oscar.capabilities as oscar
statuses = Storage({
'free for chat': _('Free for Chat'),
'busy': _('Occupied') })
special_msg_strings = {
'%n': (lambda buddy: buddy.protocol.username),
'%d': (lambda buddy: datetime.date.today().strftime('%m/%d/%Y')),
'%t': (lambda buddy: datetime.datetime.now().strftime('%I:%M:%S %p')) }
def magic_string_repl(s, buddy):
for key, substitute_func in special_msg_strings.iteritems():
if key in s:
s = s.replace(key, substitute_func(buddy))
continue
return s
IGNORE_CAPABILITIES = False
def sanitize_aim_html(html):
(html, bgcolor) = _remove_html_cruft(html)
return html
def _remove_html_cruft(s):
lower = s.lower()
s = None if lower.startswith('<html>') else s
s = None if lower.endswith('</html>') else s
match = body_pattern.search(s)
bgcolor = u''
while match:
s = s[:match.start()] + s[match.end():]
groups = match.groups()
if groups[-1]:
bgcolor = u'bgcolor=' + groups[-1] + u' '
match = body_pattern.search(s)
s = s.replace('</body>', '').replace('</BODY>', '').replace('<br/>', '<br />')
try:
return (s.decode('utf-8', 'ignore'), bgcolor)
except Exception:
return (unicode(s), bgcolor)
def aim_to_wxhtml(s):
(s, bgcolor) = _remove_html_cruft(s)
return u'<TABLE width=100%% %scellpadding=0 cellspacing=0 border=0><tr><td>%s</td></tr></table>' % (bgcolor, preserve_whitespace(s))
def make_dgetter(obj):
def dgetter(key):
val = get(obj, key, u'')
if not val:
val = u''
if not isinstance(val, unicode):
return val.decode('fuzzy utf8')
return val
return dgetter
def make_pretty_addr(d):
addrkeys = ('street', 'city', 'state', 'zip', 'country')
get = make_dgetter(d)
def addytemplate(d):
fields = []
for k in addrkeys:
fields.append(get(k))
return 'http://maps.google.com/maps?q=' + u' '.join(filter(None, fields)).encode('utf-8').encode('url')
country = get('country')
zip = get('zip')
state = get('state')
city = get('city')
street = get('street')
res = []
add = lambda s: res.insert(0, s)
if country:
add(u'\n' + country)
if zip:
add(zip)
if state:
if zip:
add(u', ')
add(state)
if city:
if state or zip:
add(u', ')
add(city)
if street:
if city and state or zip:
add(u'\n')
add(street)
if res:
add([
u'(',
(addytemplate(d), u'map'),
u')\n'])
return res
class OscarBuddy(common.buddy):
def __init__(self, name, protocol):
self._status = 'unknown'
self._idle_time = None
self._friendly_name = None
self._avail_msg = None
self._away_msg = None
self._user_class = self.create_time = self.signon_time = 0
common.buddy.__init__(self, name.lower().replace(' ', ''), protocol)
self.account_creation_time = None
self.online_time = None
self.user_status_icq = 'offline'
self.external_ip_icq = 0
self._dc_info = util.Storage()
self._capabilities = []
self.userinfo = { }
if self._profile is False:
self._profile = None
self._away_updated = self._mystery_updated = 0
self._waiting_for_presence = True
def __hash__(self):
return common.buddy.__hash__(self)
def sightly_status(self):
if self.mobile:
return _('Mobile')
elif self.service == 'aim':
return statuses.get(self.status, self.status.title())
else:
return statuses.get(self._status, self._status.title())
sightly_status = property(sightly_status)
def pretty_profile(self):
odict = odict
import util
if self.service == 'aim':
prof = [
None if self.profile else aim_to_wxhtml(u'No Profile')]
pref = pref
import common
if pref('infobox.aim.show_profile_link', True):
linkage = odict()
linkage['space'] = 4
url = ''.join([
'http://www.aimpages.com/',
self.name,
'/profile.html'])
linkage['Profile URL:'] = [
'\n',
(url, url)]
prof += [
linkage]
return prof
else:
ok = False
profile = odict()
bget = make_dgetter(self)
if bget('first') or bget('last'):
profile['Full Name:'] = ' '.join([
self.first,
self.last])
ok = True
personal = getattr(self, 'personal', { })
if personal:
pget = make_dgetter(personal)
homeaddr = make_pretty_addr(personal)
else:
pget = lambda s: ''
homeaddr = ''
for key in 'gender birthday'.split():
if hasattr(personal, key) and getattr(personal, key, False):
profile[key.capitalize() + ':'] = pget(key)
ok = True
continue
p_web = pget('website')
if p_web:
profile['Website:'] = (p_web, p_web)
ok = True
prof = bget('profile')
if prof:
if ok:
profile['sep1'] = 4
try:
profstr = u'\n' + prof
except Exception:
pass
profile['Additional Information:'] = profstr
ok = True
if homeaddr:
if ok:
profile['sep2'] = 4
profile['Home Address:'] = homeaddr
ok = True
work = getattr(self, 'work', { })
if work:
wget = make_dgetter(work)
workaddr = make_pretty_addr(work)
else:
wget = lambda s: ''
workaddr = ''
if workaddr:
if ok:
profile['sep3'] = 4
profile['Work Address:'] = workaddr
ok = True
if ok:
profile['sep4'] = 4
ok = False
for key in 'company department position'.split():
if hasattr(work, key) and getattr(work, key, False):
profile[key.capitalize() + ':'] = wget(key)
ok = True
continue
w_web = wget('website')
if w_web:
profile['Work Website:'] = (w_web, w_web)
ok = True
if ok:
ok = False
profile['sep5'] = 4
url = ''.join([
u'http://people.icq.com/people/about_me.php?uin=',
self.name])
profile['Profile URL:'] = [
'\n',
(url, url)]
return profile
pretty_profile = property(pretty_profile)
def service(self):
return None if self.name.isdigit() else 'aim'
service = property(service)
def icq(self):
return self.service == 'icq'
icq = property(icq)
def update(self, userinfo):
self._idle_time = None
notifyattrs = []
notify_append = notifyattrs.append
is_self_buddy = self.protocol.self_buddy is self
self.frozen().__enter__()
try:
for k, v in userinfo.iteritems():
if isinstance(k, basestring):
if k == 'status':
if is_self_buddy:
continue
if v == 'online':
v = 'available'
self.status = v
notify_append('status')
elif k == 'avail_msg':
self._set_avail_msg(v, False)
notify_append('status_message')
else:
try:
setattr(self, k, v)
notify_append(k)
except AttributeError:
self.frozen()
e = self.frozen()
print_exc()
except:
self.frozen()<EXCEPTION MATCH>AttributeError
self.frozen()<EXCEPTION MATCH>AttributeError
self.userinfo[k] = v
finally:
pass
if self._status != 'away':
self._waiting_for_presence = False
notify = self.notify
for attr in notifyattrs:
notify(attr)
def _get_status_orb(self):
if self.idle:
return 'idle'
elif self.away:
return 'away'
else:
get_status_orb = get_status_orb
import common.Buddy
return get_status_orb(self)
status_orb = oproperty(_get_status_orb, observe = 'status')
def get_profile(self):
p = self._profile
if p is not None:
p = magic_string_repl(p, self)
return p
def set_profile(self, profile):
self._profile = profile
profile = property(get_profile, set_profile, doc = "This buddy's AIM profile or ICQ 'about me'.")
_profile_updated = cproperty(0)
def set_profile_updated(self, netint):
self._profile_updated = None if isinstance(netint, basestring) else netint
profile_updated = property((lambda b: b._profile_updated), set_profile_updated)
def set_away_updated(self, netint):
self._away_updated = None if isinstance(netint, basestring) else netint
away_updated = property((lambda b: b._away_updated), set_away_updated)
def set_mystery_updated(self, netint):
self._mystery_updated = None if isinstance(netint, basestring) else netint
mystery_updated = property((lambda b: b._mystery_updated), set_mystery_updated)
def sms(self):
return self.name.startswith('+')
sms = property(sms)
_profile = cproperty(None)
def request_info(self, profile, away):
if not profile and not away:
return None
self.protocol.get_buddy_info(self, profile = profile, away = away)
def set_nice_name(self, name):
self._nice_name = name
def get_nice_name(self):
return getattr(self, '_nice_name', self.name)
nice_name = property(get_nice_name, set_nice_name)
def _set_capabilities(self, newval):
if self is self.protocol.self_buddy:
return None
caps = []
while newval:
(caphi, caplo, newval) = oscar.unpack((('caphi', 'Q'), ('caplo', 'Q')), newval)
caps.append(struct.pack('!QQ', caphi, caplo))
if self._capabilities and len(caps) == 1 and caps[0] == oscar.capabilities.by_name['chat_service']:
log.info('Received dummy capabilities for %r, not setting them', self.name)
return None
self._capabilities = caps
log.debug("%r's caps are: %r", self, self.pretty_caps)
capabilities = ObservableProperty((lambda self: self._capabilities), _set_capabilities, observe = '_capabilities')
def pretty_caps(self):
return map((lambda x: oscar.capabilities.by_bytes.get(x, x)), self._capabilities)
pretty_caps = property(pretty_caps)
def caps(self):
digsbycaps = caps
import common
import oscar.capabilities as oscarcaps
protocaps = list(self.protocol.caps)
mycaps = [ oscarcaps.by_bytes.get(x, None) for x in self.capabilities ]
if _lowerstrip(self.name) in self.protocol.bots or blast_group_re.match(_lowerstrip(self.name)):
protocaps.append(digsbycaps.BOT)
return protocaps
caps = property(caps)
def supports_html_messages(self):
client = self.guess_client()
if client == 'purple' and self.protocol.self_buddy.icq:
return False
caps = set(self.pretty_caps)
if 'xhtml_support' in caps:
return True
if 'rtf_support' in caps:
return False
return client in clients_supporting_html
supports_html_messages = property(supports_html_messages)
def guess_client(self):
caps = set(self.pretty_caps)
if caps.issuperset(icq6_caps):
return 'icq6'
elif any((lambda .0: for x in .0:
x.startswith('Miranda'))(caps)):
if caps.issuperset(('icq_unknown1', 'icq6_unknown4')):
return 'miranda-icq'
else:
return 'miranda-aim'
elif caps.issuperset(aim5_caps):
return 'aim59'
elif 'digsby' in caps:
return 'digsby'
elif caps.issuperset(aim6_caps) or 'aim6_unknown1' in caps:
return 'aim60'
elif caps.issuperset(pidgin_caps):
return 'purple'
elif self.mobile or self.sms:
return 'mobile'
else:
return 'unknown'
def _set_dc_info(self, newval):
(dc_info, data) = oscar.unpack((('_', 'dc_info'),), newval)
if data:
print 'extra data on dc_info for %s: %s' % (self.name, util.to_hex(data))
self._dc_info = dc_info
dc_info = ObservableProperty((lambda self: self._dc_info), _set_dc_info, observe = '_dc_info')
def _set_user_class(self, newval):
self._user_class = struct.unpack('!H', newval)[0]
user_class = ObservableProperty((lambda self: self._user_class), _set_user_class, observe = '_user_class')
def _set_avail_msg(self, newval, notify = True):
old = self._avail_msg
tflvs = { }
(tflvs_list, newval) = oscar.unpack((('values', 'tflv_list'),), newval)
for tflv in tflvs_list:
tflvs[tflv.t] = tflv.v
if 1 in tflvs:
self.setnotifyif('icon_hash', tflvs[1])
if 2 in tflvs:
if len(tflvs[2]) > 0:
fmt = (('msglen', 'H'), ('msg', 's', 'msglen'), ('numtlvs', 'H'), ('tlvs', 'tlv_list', 'numtlvs'))
(__, msg, __, tlvs, tflvs[2]) = oscar.unpack(fmt, tflvs[2])
if self is self.protocol.self_buddy:
return None
codecs = [
'fuzzy',
'utf-8']
self._avail_msg = msg.decode(' '.join(codecs))
self._cached_status_message = None
else:
self._avail_msg = None
else:
notify = False
if notify:
self.notify('avail_msg', old, self._avail_msg)
def _get_avail_msg(self):
return self._avail_msg
avail_msg = property(_get_avail_msg, _set_avail_msg)
def __repr__(self):
return '<OscarBuddy %s>' % _lowerstrip(self.name)
def get_idle(self):
return self._idle_time
def set_idle(self, val):
self._idle_time = val
idle = ObservableProperty(get_idle, set_idle, observe = '_idle_time')
def set_idle_time(self, netidle):
if isinstance(netidle, basestring):
self._idle_time = int(time.time() - 60 * struct.unpack('!H', netidle)[0])
elif isinstance(netidle, int):
self._idle_time = netidle
elif netidle is None:
self._idle_time = None
else:
log.warning('set_idle_time received %r', netidle)
def get_idle_time(self):
i = self._idle_time
return None if i is None else int(i)
idle_time = property(get_idle_time, set_idle_time, None, 'Set by the network.')
def get_status(self):
if self._status == 'offline':
if self.mobile:
return 'mobile'
else:
return 'offline'
if self._status == 'unknown' or self._waiting_for_presence:
return 'unknown'
elif self.away or self._status == 'away':
return 'away'
elif self.idle or self._status == 'idle':
return 'idle'
else:
return 'available'
def set_status(self, newval):
self._status = newval
status = ObservableProperty(get_status, set_status, observe = '_status')
def set_away_msg(self, away_msg):
if not isinstance(away_msg, (basestring, type(None))):
raise TypeError(str(type(away_msg)))
self._waiting_for_presence = False
self._away_msg = away_msg
def get_away_msg(self):
return self._away_msg
away_msg = property(get_away_msg, set_away_msg)
def get_online(self):
if self._status != 'offline':
pass
return self._status != 'unknown'
online = ObservableProperty(get_online, observe = '_status')
def _away(self):
if self.user_class:
v = bool(self.user_class & 32)
else:
v = self._status == 'away'
if self.online:
pass
return v
away = ObservableProperty(_away, observe = 'user_class')
def direct_connect(self):
return self.protocol.direct_connect(self)
def _get_invisible(self):
return bool(self.user_class & 256)
def _set_invisible(self, invis):
pass
invisible = ObservableProperty(_get_invisible, _set_invisible, observe = 'user_class')
def _get_stripped_msg(self):
if not self.status_message:
pass
msg = util.strip_html2('')
try:
return msg.decode('xml')
except Exception:
return msg
stripped_msg = ObservableProperty(_get_stripped_msg, observe = 'status_message')
def get_status_message(self):
if self.away and self.away_msg and self.away_msg.strip():
return aim_to_wxhtml(magic_string_repl(self.away_msg, self))
elif self.avail_msg and self.avail_msg.strip():
return aim_to_wxhtml(magic_string_repl(self.avail_msg.encode('xml'), self))
else:
return None
def set_status_message(self, val):
if not isinstance(val, (basestring, type(None))):
raise TypeError(str(type(val)))
self.away_msg = self._avail_msg = val
status_message = ObservableProperty(get_status_message, set_status_message, observe = [
'away_msg',
'_avail_msg'])
def _mobile(self):
if self.user_class and self.user_class & 128 != 0:
pass
return self.protocol.self_buddy != self
mobile = ObservableProperty(_mobile, observe = 'user_class')
def _set_online_time(self, val):
if isinstance(val, str) and len(val) == 4:
self._online_time = time.time() - struct.unpack('!I', val)[0]
else:
self._online_time = val
def _get_online_time(self):
return self._online_time
online_time = ObservableProperty(_get_online_time, _set_online_time, observe = '_online_time')
def __str__(self):
return self.name
def __cmp__(self, other):
if not isinstance(other, self.__class__):
return -1
a = _lowerstrip(self.name)
b = _lowerstrip(other.name)
return cmp(a, b)
def get_buddy_icon(self):
self.protocol.get_buddy_icon(self.name)
def blocked(self):
if not self.protocol.icq:
return _lowerstrip(self.name) in self.protocol.block_list
else:
return _lowerstrip(self.name) in self.protocol.ignore_list
blocked = property(blocked)
def pending_auth(self):
return False
pending_auth = property(pending_auth)
def get_remote_alias(self):
a = getattr(self, 'nick', None)
if not a:
a = '%s %s' % (getattr(self, 'first', ''), getattr(self, 'last', ''))
if not a.strip():
pass
a = None
return a
remote_alias = oproperty(get_remote_alias, observe = [
'first',
'last',
'nick'])
first = cproperty()
last = cproperty()
nick = cproperty()
personal = cproperty()
work = cproperty()
def get_alias(self):
val = None
a = profile.get_contact_info(self, 'alias')
if a and a.strip():
val = a
else:
for attr in ('local_alias', 'remote_alias', '_friendly_name', 'nice_name'):
val = getattr(self, attr, None)
if val:
break
continue
else:
val = self.name
return val.decode('fuzzy utf-8')
alias = oproperty(get_alias, observe = [
'local_alias',
'remote_alias',
'nice_name'])
def block(self, set_blocked = True, callback = None):
if not self.protocol.icq:
return self.protocol.block(self, set_blocked, callback = callback)
else:
return self.protocol.ignore(self, set_blocked, callback = callback)
block = callsback(block)
def unblock(self, callback = None):
return self.block(False, callback = callback)
unblock = callsback(unblock)
def permit(self, set_allowed = True):
return self.protocol.permit(self, set_allowed)
def unpermit(self):
return self.permit(False)
def warn(self, anon = True):
return self.protocol.warn(self, anon)
class OscarBuddies(ObservableDict, FilterDict):
def __init__(self, protocol):
ObservableDict.__init__(self)
FilterDict.__init__(self, _lowerstrip)
self.protocol = protocol
def __getitem__(self, k):
try:
return FilterDict.__getitem__(self, k)
except (KeyError,):
return self.setdefault(self.ff(k), OscarBuddy(k, self.protocol))
def update_buddies(self, infos):
for info in infos:
self[info.name].update(info)
def _lowerstrip(name):
return str(name).lower().replace(' ', '')
_lowerstrip = util.memoize(_lowerstrip)